package com.ideal.service.hdfs;

import com.ideal.service.Utils;
import com.ideal.tools.db.MysqlDBUtils;
import com.ideal.tools.ssh.common.CommonTools;
import com.ideal.tools.ssh.common.OperationMarket;
import com.ideal.tools.ssh.common.PropertyBox;
import com.ideal.tools.ssh.common.PropertyDictory;
import com.ideal.tools.ssh.context.ClusterContext;
import com.ideal.tools.ssh.entity.ContextResult;
import com.ideal.tools.ssh.entity.LinuxMachine;
import com.ideal.tools.ssh.result.LinuxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * Created by CC on 2016/7/25.
 */
public class HDFSService {

    //生成HDFS文件的前缀
    public static final String HDFS_INFO_FILENAME="hdfsMetaData";
    //记录HDFS文件的全路径
    private String filePath ;
    private static Logger logger = LoggerFactory.getLogger(HDFSService.class);
    /**
     * 刷新HDFS
     * 1.加载最新HDFS信息
     *      1.1生成HDFS文件
     *      1.2解析HDFS文件
     * 2.获取数据库HDFS信息
     * 3.获取数据库clusterUser信息
     * 4.处理比对结果
     * @param context
     */
    public void refreshHDFS(ClusterContext context) {
        if(null == context){
            context = new ClusterContext(new HashMap<String, Object>());
        }

        String initPath = PropertyBox.getVal(PropertyDictory.WEBAPP_INIT_PATH,"");
        if(!initPath.endsWith("/")){
            initPath+="/";
        }
        String fileName = HDFS_INFO_FILENAME+CommonTools.getCurrentDate()+".txt";
        filePath = initPath + fileName;
        //测试环境专用以下一行代码
        //filePath = "d:/hdfsMetaData20160727192934.txt";
        //生成HDFS文件
        loadNewHDFS(context);
        //解析HDFS文件
        List<String> newHdfsInfo = CommonTools.readFile2List(filePath);
        //获取数据库HDFS信息
        List<Map<String, Object>> dbHdfsInfo = selectDbHDFS();
        //获取数据库clusterUser信息
        Map<String,String> clusterUserMap = selectClusterUserMap();
        //处理比对结果
        handleCompareResult(newHdfsInfo, dbHdfsInfo, clusterUserMap,context);
    }

    /**
     * 生成HDFS文件
     * @param context
     * @return
     */
    private void loadNewHDFS(ClusterContext context){
        String cmd = "sudo -u hdfs hadoop fs -ls /user/*/*/|" +
                "awk '{arrlen=split($8,arr,\"/\");if(arrlen>=5 && substr($1,1,1)==\"d\") print arr[3] \" \" arr[4] \" \" $8 \" \" $1\" \"$6\":\"$7\" \"$3\" \"$4}'" +
                " > " + filePath;
        List<LinuxMachine> machineList = context.getOriginalList();
        List<LinuxMachine> finalMachines = CommonTools.getMachineListByType(machineList, context,LinuxMachine.MachineType.WebAPP);
        //webapp 只会有一台 这里只需要 得到第一个就可以了
        finalMachines.get(0).initOperation(OperationMarket.ExeOneShellCMD(cmd));
        context.setMachineList(finalMachines);
        context.doTheThing();
        context.printResult();
    }


    /**
     * 获取数据库HDFS信息
     */
    private List<Map<String, Object>> selectDbHDFS(){
        String sqlStr = new String("SELECT id,hdfsPath,clusterUserId,hdfsGroup,hdfsOwner,hdfsPerm from meta_hdfs_info_bak");
        List<Map<String , Object>> list = MysqlDBUtils.queryWebDB(sqlStr);
        return list;
    }

    /**
     * 获取数据库clusterUser信息
     * @return
     */
    private Map<String,String> selectClusterUserMap(){
        Map<String,String> clusterUserMap = new HashMap<String, String>();
        String clusterSql = "select id,userName from cluster_user";
        List<Map<String , Object>> list = MysqlDBUtils.queryWebDB(clusterSql);
        if(null!= list && list.size()>0){
            for (Map<String, Object> user : list) {
                clusterUserMap.put(user.get("USERNAME").toString(), user.get("ID").toString());
            }
        }
        return clusterUserMap;
    }
    /**
     * 处理比对结果
     * 1.1去除无效信息，得到新的list
     * 1.2处理比对结果
     */
    private void handleCompareResult(List<String> newHdfsInfo ,List<Map<String, Object>> dbHdfsInfo,Map<String,String> clusterUserMap,ClusterContext context){
        List<LinuxResult> resultList = new ArrayList<LinuxResult>(); //运行的结果集
        //最新HDFS数据为空，数据库clusterUser信息为空（只要满足其中一个，则不继续执行方法）
        if(null == newHdfsInfo || newHdfsInfo.size() == 0 || null == clusterUserMap || clusterUserMap.size()==0){
            //组装context需要返回的结果参数
            LinuxResult tmpResult = new LinuxResult(null,null,LinuxResult.DEFAULT_FAILD_CODE);
            tmpResult.setNote("刷新失败！");
            resultList.add(tmpResult);
            context.getContextResult().setConextResult("", resultList);
            return;
        }

        //移除无效的数据结果：key：path,value:String[]
        Map<String,String[]> finalHdfsMap = new HashMap<String, String[]>();
        for (String curStr: newHdfsInfo){
            //curStr原数据：A001 public /user/A001/public/1111 drwxr-x--- 2016-01-18:15:51 A001 A001_1111
            //对应数据库字段：clusterUserId=A001，properties=public，hdfsPath=/user/A001/public/1111，hdfsPerm=drwxr-x---
            //                createTime=2016-01-18:15:51,hdfsOwner=A001,hdfsGroup=A001_1111
            //原数据中没有对应的note值需要自己补充，为了扩容数组长度，这里赋值为空
            curStr +=" ''";
            String[] tem = curStr.split(" ");
            String newHdfsPath = tem[2];
            if(clusterUserMap.get(tem[0])!=null){
                //匹配clusterUserId
                tem[0] = clusterUserMap.get(tem[0]);
                //properties数据库存储格式：1（private）,0(public)
                tem[1] = tem[1].equals("private")?"1":"0";
                //createTime数据库存储格式：132312314344
                tem[4] = formateDate(tem[4]) +"";
                //note数据库必填，此处为空
                tem[7] = " ";
                finalHdfsMap.put(newHdfsPath,tem);
            }
        }

        //删除用的数组
        List<String[]> deleteDbList = new ArrayList<String[]>();
        int index = 0 ;
        if(null != dbHdfsInfo && dbHdfsInfo.size()>0){
            for (Map<String, Object> curMap: dbHdfsInfo){
                String dbHdfsPath = curMap.get("hdfsPath").toString();
                //如果新数据中匹配到数据库数据，则移除新数据，否则就是数据库中多余的需要删除的数据
                if(finalHdfsMap.get(dbHdfsPath) != null){
                    finalHdfsMap.remove(dbHdfsPath);
                }else{
                    deleteDbList.add(new String[]{curMap.get("id").toString()});
                }
            }
        }

        //新增用的数组
        Object[][] insertParam = new Object[finalHdfsMap.size()][];
        int count= 0;
        for(Map.Entry<String,String[]> entry:finalHdfsMap.entrySet()){
            insertParam[count] = entry.getValue();
            count++;
        }
        //批量插入
        String insertSql = " insert into meta_hdfs_info_bak " +
                "(clusterUserId,properties,hdfsPath,hdfsPerm,createTime,hdfsOwner,hdfsGroup,note) values" +
                " (?,?,?,?,?,?,?,?)";
        MysqlDBUtils.batchWebDB(insertSql,insertParam);
        //删除用的数组
        if(null != deleteDbList && deleteDbList.size()>0){
            Object[][] deleteParam = new Object[deleteDbList.size()][];
            for(int i = 0;i<deleteDbList.size();i++){
                deleteParam[i] = deleteDbList.get(i);
            }
            //批量删除
            String delectSql = "delete from meta_hdfs_info_bak where id = ?";
            MysqlDBUtils.batchWebDB(delectSql,deleteParam);
        }


        //组装context需要返回的结果参数
        LinuxResult tmpResult = new LinuxResult(null,null,LinuxResult.DEFAULT_SUCCESS_CODE);
        tmpResult.setNote("刷新成功！");
        resultList.add(tmpResult);
        ContextResult contextResult = context.getContextResult();
        if(null == contextResult){
            contextResult = new ContextResult();
            contextResult.setConextResult("",resultList);
        }
    }

    /**
     * 日期格式化
     * 由2016-01-18:15:51格式为1323213244
     * @param dateStr
     * @return
     */
    private long formateDate(String dateStr){
        SimpleDateFormat simpleDateFormat =new SimpleDateFormat("yyyy-MM-dd:HH:mm");
        Date date= null;
        try {
            date = simpleDateFormat.parse(dateStr);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        long timeStemp = date.getTime();
        return timeStemp;
    }

    public static void main(String[] args) {
        HDFSService hdfs = new HDFSService();
        hdfs.refreshHDFS(null);
    }
}
